Redshiftに対してのUPDATEとDELETEを実施してみた | Luigi Advent Calendar 2016 #16
はじめに
好物はインフラとフロントエンドのかじわらゆたかです。
このエントリは『Luigi Advent Calendar 2016』16日目の内容となります。
先日15日目はRedshiftからUnloadを用いてデータ取得してみたでした。
前回まででSELECT UNLOADを試しました。
UPDATE/DELETEといった、それ以外のクエリの書き方等を調べてみたいと思います。
その中で、Luigiでのトランザクションの用い方等も調べていきたいと思います。
下準備
以下のテストデータとそれを取り込めるテーブルを用意して、データは読み込ませておきます。
1 Customer#000000001 IVhzIApeRb MOROCCO 0 MOROCCO AFRICA 25-989-741-2988 BUILDING 2 Customer#000000002 XSTf4,NCwDVaWNe6tE JORDAN 1 JORDAN MIDDLE EAST 23-768-687-3665 AUTOMOBILE 3 Customer#000000003 MG9kdTD ARGENTINA7 ARGENTINA AMERICA 11-719-748-3364 AUTOMOBILE 4 Customer#000000004 XxVSJsL EGYPT 4 EGYPT MIDDLE EAST 14-128-190-5944 MACHINERY 5 Customer#000000005 KvpyuHCplrB84WgAi CANADA 5 CANADA AMERICA 13-750-942-6364 HOUSEHOLD 6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx SAUDI ARA2 SAUDI ARABIA MIDDLE EAST 30-114-968-4951 AUTOMOBILE 7 Customer#000000007 TcGe5gaZNgVePxU5kR CHINA 0 CHINA ASIA 28-190-982-9759 AUTOMOBILE 8 Customer#000000008 I0B10bB0AymmC, 0PrRYBC PERU 6 PERU AMERICA 27-147-574-9335 BUILDING 9 Customer#000000009 xKiAFTjUsCuxfele INDIA 6 INDIA ASIA 18-338-906-3675 FURNITURE 10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL ETHIOPIA 9 ETHIOPIA AFRICA 15-741-346-9870 HOUSEHOL
CREATE TABLE 顧客情報 ( 顧客ID INTEGER NOT NULL, 氏名 VARCHAR(25) NOT NULL, 住所 VARCHAR(25) NOT NULL, 市 VARCHAR(10) NOT NULL, 国 VARCHAR(15) NOT NULL, 地域 VARCHAR(12) NOT NULL, 電話番号 VARCHAR(15) NOT NULL, 市場区分 VARCHAR(10) NOT NULL ); copy 顧客情報 from 's3://cm-kajiwara-redshift-copy/customer0.tsv' CREDENTIALS 'aws_iam_role=arn:aws:iam::nnnnnnnnnnnn:role/redshift-role-mmmmmmmmm' DELIMITER AS '\t' TIMEFORMAT AS 'auto' region AS 'ap-northeast-1';;
UPDATE / DELETE文の実行
こちらはSELECTの時と同様にconnectionからqueryメソッドをもちいることで、実行可能です。
SELECTの時と異なる点としては、RedshiftTargetが実際に用いているpsycopg2の実装はクエリ発行時にトランザクションを貼っているため、
明示的にcommitをする必要があると言った点です。commitをしないともちろん、データは反映されません。
import luigi import luigi.contrib.redshift class updateRedshiftTable(luigi.Task): host = "[Redshift Host]" database = "[Redshift database]" password = "[Redshift password]" user = "[Redshift user]" table = "顧客情報" def run(self): output = self.output() connection = output.connect() cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute("update {} SET 氏名 = 'UpdateCustomer#000000002' WHERE 顧客ID=1".format(self.table)) output.touch(connection) connection.commit() # commit and clean up connection.close() except: logger.warning("UPDATE query error") raise def output(self): return luigi.contrib.redshift.RedshiftTarget( host=self.host, database=self.database, password=self.password, user=self.user, table=self.table, update_id="updateRedshiftTable")
import luigi import luigi.contrib.redshift class deleteRedshiftTable(luigi.Task): host = "[Redshift Host]" database = "[Redshift database]" password = "[Redshift password]" user = "[Redshift user]" table = "顧客情報" def run(self): output = self.output() connection = output.connect() cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute("delete from {} WHERE 顧客ID=2".format(self.table)) output.touch(connection) connection.commit() # commit and clean up connection.close() except: logger.warning("DELETE query error") raise def output(self): return luigi.contrib.redshift.RedshiftTarget( host=self.host, database=self.database, password=self.password, user=self.user, table=self.table, update_id="deleteRedshiftTable")
実行結果
顧客id | 氏名 | 住所 | 市 | 国 | 地域 | 電話番号 | 市場区分 |
---|---|---|---|---|---|---|---|
1 | UpdateCustomer#000000002 | IVhzIApeRb | MOROCCO 0 | MOROCCO | AFRICA | 25-989-741-2988 | BUILDING |
3 | Customer#000000003 | MG9kdTD | ARGENTINA7 | ARGENTINA | AMERICA | 11-719-748-3364 | AUTOMOBILE |
4 | Customer#000000004 | XxVSJsL | EGYPT 4 | EGYPT | MIDDLE EAST | 14-128-190-5944 | MACHINERY |
5 | Customer#000000005 | KvpyuHCplrB84WgAi | CANADA 5 | CANADA | AMERICA | 13-750-942-6364 | HOUSEHOLD |
6 | Customer#000000006 | sKZz0CsnMD7mp4Xd0YrBvx | SAUDI ARA2 | SAUDI ARABIA | MIDDLE EAST | 30-114-968-4951 | AUTOMOBILE |
7 | Customer#000000007 | TcGe5gaZNgVePxU5kR | CHINA 0 | CHINA | ASIA | 28-190-982-9759 | AUTOMOBILE |
8 | Customer#000000008 | I0B10bB0AymmC, 0PrRYBC | PERU 6 | PERU | AMERICA | 27-147-574-9335 | BUILDING |
9 | Customer#000000009 | xKiAFTjUsCuxfele | INDIA 6 | INDIA | ASIA | 18-338-906-3675 | FURNITURE |
10 | Customer#000000010 | 6LrEaV6KR6PLVcgl2ArL | ETHIOPIA 9 | ETHIOPIA | AFRICA | 15-741-346-9870 | HOUSEHOLD |
顧客id 1 の氏名が更新されていて、顧客id 2のレコードが消えていることがわかります。
トランザクションのロールバック
先ほども書きましたが、LuigiのRedshiftモジュールがラップしているpsycopg2はデフォルトでトランザクションを貼っています。
今度は更新・削除を実施後、結果がロールバックしていることを確認してみたいと思います。
import luigi import luigi.contrib.redshift class rollbackRedshiftTable(luigi.Task): host = "[Redshift Host]" database = "[Redshift database]" password = "[Redshift password]" user = "[Redshift user]" table = "顧客情報" def run(self): output = self.output() connection = output.connect() cursor = connection.cursor(cursor_factory=psycopg2.extras.DictCursor) try: cursor.execute("update {} SET 氏名 = 'RbkCustomer#000000002' WHERE 顧客ID=3".format(self.table)) cursor.execute("delete from {} WHERE 顧客ID=4".format(self.table)) output.touch(connection) connection.rollback() # commit and clean up connection.close() except: logger.warning("Rallback query error") raise def output(self): return luigi.contrib.redshift.RedshiftTarget( host=self.host, database=self.database, password=self.password, user=self.user, table=self.table, update_id="rollbackRedshiftTable")
実装としては、commitしていた個所をrollbackメソッドに変更しています。
このようにすることで、変更はコミットされず、ロールバックされることになります。
実行結果に付いては割愛します。
まとめ
基本的なクエリの受け渡しもpsycopg2の仕様の問題はありましたが、それ以外は特に問題なく行えそうです。
明日からはRedshiftへのCopyコマンドの実行を行なってみたいと思います。